package org.apache.hadoop.ipc;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.net.SocketFactory;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputOutputStream;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.Time;
import org.htrace.Sampler;
import org.htrace.Trace;
import org.htrace.TraceScope;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.GeneratedMessage;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
import com.google.protobuf.TextFormat;

/**
 * ProtobufRpcEngine是 现有 Hadoop RpcEngine里面的默认使用方式, 因为Hadoop所有通讯99.99%都是通过proto 协议实现的.
 * WritableRpcEngine 这个方式在hadoop3.2.1 版本已经被标识为[弃用] .
 */
@InterfaceStability.Evolving
public class ProtobufRpcEngine implements RpcEngine {
  public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
  
  static { // Register the rpcRequest deserializer for WritableRpcEngine 
    org.apache.hadoop.ipc.Server.registerProtocolEngine(
        RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class,
        new Server.ProtoBufRpcInvoker());
  }

  private static final ClientCache CLIENTS = new ClientCache();

  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
      SocketFactory factory, int rpcTimeout) throws IOException {
    return getProxy(protocol, clientVersion, addr, ticket, conf, factory, rpcTimeout, null);
  }

  @Override
  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
      SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy ) throws IOException {
    return getProxy(protocol, clientVersion, addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy, null);
  }

  /**
   * getProxy()方法首先会构造一个实现了InvocationHandler接口的invoker对象，这个invoker对象是ProtoBufRpcEngine.Invoker类型的。
   *       （动态代理机制中的InvocationHandler对象会在invoke()方法中代理所有目标接口上的调用，用户可以在invoke()方法中添加代理操作），
   *
   * 构造了这个对象之后，getProxy()就可以调用Proxy.newProxyInstance()方法构造动态代理对象，然后将这个对象封装在ProtocolProxy对象中并返回。
   */
  @Override
  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
      SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
      AtomicBoolean fallbackToSimpleAuth) throws IOException {

    // 首先构造 InvocationHandler对象
    final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);
    // 然后调用 Proxy.newProxyInstance() 获取动态代理对象，并通过ProtocolProxy返回
    return new ProtocolProxy<T>(protocol,
            (T) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[]{protocol}, invoker), // TODO 核心代码
            false);
    // 由于Java动态代理机制决定了在代理对象上的所有调用都会由InvocationHandler对象的invoke()方法代理，
    // 所以所有在ClientNamenodeProtocolPB代理对象上的调用都会由这个ProtobufRpcEngine.Invoker对象的invoker()方法代理，
  }
  
  @Override
  public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy( ConnectionId connId, Configuration conf, SocketFactory factory) throws IOException {
    Class<ProtocolMetaInfoPB> protocol = ProtocolMetaInfoPB.class;
    return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
        (ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
            new Class[] { protocol }, new Invoker(protocol, connId, conf, factory)), false);
  }

  /**
   * WritableRpcEngine.Invoker类相同，都是用于处理客户端发送RPC请求的。
   * 不同处有4点：
   * 1. RPC请求信息以及请求参数的序列化使用了protobuf工具；
   * 2. 使用RPCRequestWrapper类包装RPC请求（包括请求信息以及请求参数）；
   * 3. 使用RPCResponseWrapper类包装服务器发回的RPC响应信息；
   * 4. 使用protobuf反序列化远程服务器发回的RPC响应信息。
   */
  private static class Invoker implements RpcInvocationHandler {
    private final Map<String, Message> returnTypes =  new ConcurrentHashMap<String, Message>();
    private boolean isClosed = false;
    private final Client.ConnectionId remoteId;
    private final Client client;
    private final long clientProtocolVersion;
    private final String protocolName;
    private AtomicBoolean fallbackToSimpleAuth;

    private Invoker(Class<?> protocol, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory,
        int rpcTimeout, RetryPolicy connectionRetryPolicy, AtomicBoolean fallbackToSimpleAuth) throws IOException {

      this(protocol, Client.ConnectionId.getConnectionId( addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf), conf, factory);
      this.fallbackToSimpleAuth = fallbackToSimpleAuth;
    }
    
    private Invoker(Class<?> protocol, Client.ConnectionId connId, Configuration conf, SocketFactory factory) {
      this.remoteId = connId;
      this.client = CLIENTS.getClient(conf, factory, RpcResponseWrapper.class);
      this.protocolName = RPC.getProtocolName(protocol);
      this.clientProtocolVersion = RPC.getProtocolVersion(protocol);
    }

    private RequestHeaderProto constructRpcRequestHeader(Method method) {
      RequestHeaderProto.Builder builder = RequestHeaderProto.newBuilder();
      builder.setMethodName(method.getName());
     

      // For protobuf, {@code protocol} used when creating client side proxy is
      // the interface extending BlockingInterface, which has the annotations  such as ProtocolName etc.
      //
      // Using Method.getDeclaringClass(), as in WritableEngine to get at
      // the protocol interface will return BlockingInterface, from where 
      // the annotation ProtocolName and Version cannot be obtained.
      //
      // Hence we simply use the protocol class used to create the proxy.
      // For PB this may limit the use of mixins on client side.
      builder.setDeclaringClassProtocolName(protocolName);
      builder.setClientProtocolVersion(clientProtocolVersion);
      return builder.build();
    }

    /**
     * ProtobufRpcEngine.Invoker.invoker()方法主要做了三件事情：
     * ①构造请求头域，使用protobuf将请求头序列化，这个请求头域记录了当前RPC调用是什么接口的什么方法上的调用；
     * ②通过RPC.Client类发送请求头以及序列化好的请求参数。请求参数是在ClientNamenodeProtocolPB调用时就已经序列化好的，调用Client.call()方法时，需要将请求头以及请求参数使用一个RpcRequestWrapper对象封装；
     * ③获取响应信息，序列化响应信息并返回。
     */
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws ServiceException {
      // pb 接口的参数只有2个，即 RpcController + Message
      if (args.length != 2) { // RpcController + Message
        throw new ServiceException("Too many parameters for request. Method: ["
            + method.getName() + "]" + ", Expected: 2, Actual: "
            + args.length);
      }

      if (args[1] == null) {
        throw new ServiceException("null param while calling Method: [" + method.getName() + "]");
      }

      TraceScope traceScope = null;
      // if Tracing is on then start a new span for this rpc.
      // guard it in the if statement to make sure there isn't any extra string manipulation.
      if (Trace.isTracing()) {
        traceScope = Trace.startSpan(method.getDeclaringClass().getCanonicalName() + "." + method.getName());
      }

      // 构造请求头域，标明在什么接口上调用什么方法
      RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
      // 获取请求调用的参数，例如 RenameRequestProto
      Message theRequest = (Message) args[1];
      final RpcResponseWrapper val;
      try {
        // 调用 RPC.Client发送请求
        val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER, // TODO 核心方法
            new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
            fallbackToSimpleAuth);

      } catch (Throwable e) {
        throw new ServiceException(e);
      } finally {
        if (traceScope != null) {
          traceScope.close();
        }
      }

      Message prototype = null;
      try {
        // 获取返回参数类型，RenameResponseProto
        prototype = getReturnProtoType(method);
      } catch (Exception e) {
        throw new ServiceException(e);
      }

      Message returnMessage;
      try {
        // 序列化响应信息并返回
        returnMessage = prototype.newBuilderForType().mergeFrom(val.theResponseRead).build();
      } catch (Throwable e) {
        throw new ServiceException(e);
      }
      // 返回结果
      return returnMessage;
    }

    @Override
    public void close() throws IOException {
      if (!isClosed) {
        isClosed = true;
        CLIENTS.stopClient(client);
      }
    }

    private Message getReturnProtoType(Method method) throws Exception {
      if (returnTypes.containsKey(method.getName())) {
        return returnTypes.get(method.getName());
      }
      
      Class<?> returnType = method.getReturnType();
      Method newInstMethod = returnType.getMethod("getDefaultInstance");
      newInstMethod.setAccessible(true);
      Message prototype = (Message) newInstMethod.invoke(null, (Object[]) null);
      returnTypes.put(method.getName(), prototype);
      return prototype;
    }

    @Override //RpcInvocationHandler
    public ConnectionId getConnectionId() {
      return remoteId;
    }
  }

  interface RpcWrapper extends Writable {
    int getLength();
  }
  /**
   * Wrapper for Protocol Buffer Requests
   * 
   * Note while this wrapper is writable, the request on the wire is in
   * Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC} 
   * use type Writable as a wrapper to work across multiple RpcEngine kinds.
   */
  private static abstract class RpcMessageWithHeader<T extends GeneratedMessage>
    implements RpcWrapper {
    T requestHeader;
    Message theRequest; // for clientSide, the request is here
    byte[] theRequestRead; // for server side, the request is here

    public RpcMessageWithHeader() {
    }

    public RpcMessageWithHeader(T requestHeader, Message theRequest) {
      this.requestHeader = requestHeader;
      this.theRequest = theRequest;
    }

    @Override
    public void write(DataOutput out) throws IOException {
      OutputStream os = DataOutputOutputStream.constructOutputStream(out);
      
      ((Message)requestHeader).writeDelimitedTo(os);
      theRequest.writeDelimitedTo(os);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
      requestHeader = parseHeaderFrom(readVarintBytes(in));
      theRequestRead = readMessageRequest(in);
    }

    abstract T parseHeaderFrom(byte[] bytes) throws IOException;

    byte[] readMessageRequest(DataInput in) throws IOException {
      return readVarintBytes(in);
    }

    private static byte[] readVarintBytes(DataInput in) throws IOException {
      final int length = ProtoUtil.readRawVarint32(in);
      final byte[] bytes = new byte[length];
      in.readFully(bytes);
      return bytes;
    }

    public T getMessageHeader() {
      return requestHeader;
    }

    public byte[] getMessageBytes() {
      return theRequestRead;
    }
    
    @Override
    public int getLength() {
      int headerLen = requestHeader.getSerializedSize();
      int reqLen;
      if (theRequest != null) {
        reqLen = theRequest.getSerializedSize();
      } else if (theRequestRead != null ) {
        reqLen = theRequestRead.length;
      } else {
        throw new IllegalArgumentException(
            "getLength on uninitialized RpcWrapper");      
      }
      return CodedOutputStream.computeRawVarint32Size(headerLen) +  headerLen
          + CodedOutputStream.computeRawVarint32Size(reqLen) + reqLen;
    }
  }

  /**
   * Client.call()方法只接受Writalbe类型的RPC请求，而ProtobufRpcEngine.Invoker是使用protobuf序列化RPC请求的，
   * 这里就需要一个包装类RpcRequestWrapper将protobuf格式的请求包装成Writable类型。
   *
   * RpcRequestWrapper保存了两部分信息，这两部分信息都是使用protobuf序列化的:
   * 1. RPC请求头：RPC请求头记录了客户端在什么接口上调用了什么方法；
   * 2. 请求参数：方法调用的参数。Invoker类构造了RpcRequestWrapper对象后，会调用Client.call()方法将这个请求发送到远程服务器。
   */
  private static class RpcRequestWrapper extends RpcMessageWithHeader<RequestHeaderProto> {
    public RpcRequestWrapper() {}
    
    public RpcRequestWrapper( RequestHeaderProto requestHeader, Message theRequest) {
      super(requestHeader, theRequest);
    }
    
    @Override
    RequestHeaderProto parseHeaderFrom(byte[] bytes) throws IOException {
      return RequestHeaderProto.parseFrom(bytes);
    }
    
    @Override
    public String toString() {
      return requestHeader.getDeclaringClassProtocolName() + "." + requestHeader.getMethodName();
    }
  }

  @InterfaceAudience.LimitedPrivate({"RPC"})
  public static class RpcRequestMessageWrapper
  extends RpcMessageWithHeader<RpcRequestHeaderProto> {
    public RpcRequestMessageWrapper() {}
    
    public RpcRequestMessageWrapper( RpcRequestHeaderProto requestHeader, Message theRequest) {
      super(requestHeader, theRequest);
    }
    
    @Override
    RpcRequestHeaderProto parseHeaderFrom(byte[] bytes) throws IOException {
      return RpcRequestHeaderProto.parseFrom(bytes);
    }
  }

  @InterfaceAudience.LimitedPrivate({"RPC"})
  public static class RpcResponseMessageWrapper extends RpcMessageWithHeader<RpcResponseHeaderProto> {
    public RpcResponseMessageWrapper() {}
    
    public RpcResponseMessageWrapper( RpcResponseHeaderProto responseHeader, Message theRequest) {
      super(responseHeader, theRequest);
    }
    
    @Override
    byte[] readMessageRequest(DataInput in) throws IOException {
      // error message contain no message body
      switch (requestHeader.getStatus()) {
        case ERROR:
        case FATAL:
          return null;
        default:
          return super.readMessageRequest(in);
      }
    }
    
    @Override
    RpcResponseHeaderProto parseHeaderFrom(byte[] bytes) throws IOException {
      return RpcResponseHeaderProto.parseFrom(bytes);
    }
  }

  /**
   * Client.call()方法返回的响应信息也是Writable类型的，但是服务器端使用了protobuf序列化RPC响应信息，
   * 这就需要定义一个包装类RpcResponseWrapper将这个protobuf序列化的响应信息包装成Writable类型。
   */
  @InterfaceAudience.LimitedPrivate({"RPC"}) // temporarily exposed 
  public static class RpcResponseWrapper implements RpcWrapper {
    Message theResponse; // for senderSide, the response is here
    byte[] theResponseRead; // for receiver side, the response is here

    public RpcResponseWrapper() {
    }

    public RpcResponseWrapper(Message message) {
      this.theResponse = message;
    }

    @Override
    public void write(DataOutput out) throws IOException {
      OutputStream os = DataOutputOutputStream.constructOutputStream(out);
      theResponse.writeDelimitedTo(os);   
    }

    @Override
    public void readFields(DataInput in) throws IOException {
      int length = ProtoUtil.readRawVarint32(in);
      theResponseRead = new byte[length];
      in.readFully(theResponseRead);
    }
    
    @Override
    public int getLength() {
      int resLen;
      if (theResponse != null) {
        resLen = theResponse.getSerializedSize();
      } else if (theResponseRead != null ) {
        resLen = theResponseRead.length;
      } else {
        throw new IllegalArgumentException(
            "getLength on uninitialized RpcWrapper");      
      }
      return CodedOutputStream.computeRawVarint32Size(resLen) + resLen;
    }
  }

  @VisibleForTesting
  @InterfaceAudience.Private
  @InterfaceStability.Unstable
  static Client getClient(Configuration conf) {
    return CLIENTS.getClient(conf, SocketFactory.getDefault(),
        RpcResponseWrapper.class);
  }
  
 

  @Override
  public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
      String bindAddress, int port, int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager, String portRangeConfig) throws IOException {

    return new Server(protocol, protocolImpl, conf, bindAddress, port, numHandlers, numReaders, queueSizePerHandler, verbose, secretManager, portRangeConfig);
  }

  /**
   * 与WritableRpcEngine.Server类的功能类似，它会在Socket上监听RPC请求，并调用ProtoBufRpcInvoker类的call()方法响应这个请求
   */
  public static class Server extends RPC.Server {

    /**
     * Construct an RPC server.
     * @param protocolClass the class of protocol
     * @param protocolImpl the protocolImpl whose methods will be called
     * @param conf the configuration to use
     * @param bindAddress the address to bind on to listen for connection
     * @param port the port to listen for connections on
     * @param numHandlers the number of method handler threads to run
     * @param verbose whether each call should be logged
     * @param portRangeConfig A config parameter that can be used to restrict
     * the range of ports used when port is 0 (an ephemeral port)
     */
    public Server(Class<?> protocolClass, Object protocolImpl, Configuration conf, String bindAddress, int port, int numHandlers,
        int numReaders, int queueSizePerHandler, boolean verbose, SecretManager<? extends TokenIdentifier> secretManager, String portRangeConfig) throws IOException {
      super(bindAddress, port, null, numHandlers, numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl .getClass().getName()), secretManager, portRangeConfig);
      this.verbose = verbose;  
      registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass, protocolImpl);
    }
    
    /**
     * 与WritableRpcInvoker类的功能类似，用于响应远程客户端的请求。
     * 不同之处有如下几点：
     * 1. 使用了protobuf这个序列化工具；
     * 2. 使用了RPCRequestWrapper类包装RPC请求（包括请求信息以及请求参数）；
     * 3. 使用了RPCResponseWrapper类包装服务器发回的RPC响应信息。
     *
     *
     * 不同的RpcEngine会实现自己的Server对象（RPC.Server的子类），Server对象又会实现一个内部的RpcInvoker对象，
     * 例如ProtobufRpcEngine实现了内部类Server，而Server类又定义了自己的ProtoBufRpcInvoker类。
     * 这个ProtoBufRpcInvoker对象会使用protobuf反序列化RPC请求（RpcRequestWrapper类型），
     * 然后调用服务程序响应这个请求，
     * 最后ProtoBufRpcInvoker会将响应消息序列化包装成一个RpcResponseWrapper对象并返回。
     */
    static class ProtoBufRpcInvoker implements RpcInvoker {

      private static ProtoClassProtoImpl getProtocolImpl(RPC.Server server, String protoName, long clientVersion) throws RpcServerException {
        ProtoNameVer pv = new ProtoNameVer(protoName, clientVersion);
        ProtoClassProtoImpl impl =  server.getProtocolImplMap(RPC.RpcKind.RPC_PROTOCOL_BUFFER).get(pv);
        if (impl == null) { // no match for Protocol AND Version
          VerProtocolImpl highest =  server.getHighestSupportedProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,  protoName);
          if (highest == null) {
            throw new RpcNoSuchProtocolException( "Unknown protocol: " + protoName);
          }
          // protocol supported but not the version that client wants
          throw new RPC.VersionMismatch(protoName, clientVersion, highest.version);
        }
        return impl;
      }

      @Override 
      /**
       * This is a server side method, which is invoked over RPC. On success
       * the return response has protobuf response payload. On failure, the
       * exception name and the stack trace are return in the resposne.
       * See {@link HadoopRpcResponseProto}
       * 
       * In this method there three types of exceptions possible and they are
       * returned in response as follows.
       * <ol>
       * <li> Exceptions encountered in this method that are returned 
       * as {@link RpcServerException} </li>
       * <li> Exceptions thrown by the service is wrapped in ServiceException. 
       * In that this method returns in response the exception thrown by the 
       * service.</li>
       * <li> Other exceptions thrown by the service. They are returned as
       * it is.</li>
       * </ol>
       */
      public Writable call(RPC.Server server, String protocol, Writable writableRequest, long receiveTime) throws Exception {
        RpcRequestWrapper request = (RpcRequestWrapper) writableRequest;
        RequestHeaderProto rpcRequest = request.requestHeader;
        String methodName = rpcRequest.getMethodName();
        String protoName = rpcRequest.getDeclaringClassProtocolName();
        long clientVersion = rpcRequest.getClientProtocolVersion();
        if (server.verbose) {
          LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
        }
        
        ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName, clientVersion);
        BlockingService service = (BlockingService) protocolImpl.protocolImpl;
        MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);
        if (methodDescriptor == null) {
          String msg = "Unknown method " + methodName + " called on " + protocol + " protocol.";
          LOG.warn(msg);
          throw new RpcNoSuchMethodException(msg);
        }
        Message prototype = service.getRequestPrototype(methodDescriptor);
        Message param = prototype.newBuilderForType().mergeFrom(request.theRequestRead).build();
        
        Message result;
        long startTime = Time.now();
        int qTime = (int) (startTime - receiveTime);
        Exception exception = null;
        try {
          server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
          result = service.callBlockingMethod(methodDescriptor, null, param);
        } catch (ServiceException e) {
          exception = (Exception) e.getCause();
          throw (Exception) e.getCause();
        } catch (Exception e) {
          exception = e;
          throw e;
        } finally {
          int processingTime = (int) (Time.now() - startTime);
          if (LOG.isDebugEnabled()) {
            String msg = "Served: " + methodName + " queueTime= " + qTime + " procesingTime= " + processingTime;
            if (exception != null) {
              msg += " exception= " + exception.getClass().getSimpleName();
            }
            LOG.debug(msg);
          }
          String detailedMetricsName = (exception == null) ? methodName : exception.getClass().getSimpleName();
          server.rpcMetrics.addRpcQueueTime(qTime);
          server.rpcMetrics.addRpcProcessingTime(processingTime);
          server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName, processingTime);
        }
        return new RpcResponseWrapper(result);
      }
    }
  }
}
